package com.boot.mq.rabbit.handler.canal;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.boot.base.common.utils.ErrorUtils;
import com.boot.mq.rabbit.handler.MessageHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * FlatMessage 消息处理器
 * @author zongbo1
 */
@Slf4j
@Component("redisMessageHandler")
public class CanalMessageHandler implements MessageHandler<FlatMessage> {
    @Autowired
    List<CanalTableHandler> tableHandlerList;

    @Override
    public boolean handle(FlatMessage flatMessage) {
        AtomicBoolean isHandled = new AtomicBoolean(false);
        String table = flatMessage.getDatabase()+"."+flatMessage.getTable().toLowerCase() ;
        CanalEntry.EventType eventType = CanalEntry.EventType.valueOf(flatMessage.getType());
        tableHandlerList.forEach(handler -> {
            try{
                if(!handler.supportTable().equals(table)){
                    return ;
                }
                switch (eventType){
                    case INSERT:
                        handler.insert(flatMessage);
                        isHandled.set(true);
                        break ;
                    case UPDATE:
                        handler.update(flatMessage);
                        isHandled.set(true);
                        break ;
                    case DELETE:
                        handler.delete(flatMessage);
                        isHandled.set(true);
                        break ;
                    default:
                        break;
                }
            }catch (Exception e){
                e.printStackTrace();
                log.error("canal数据同步异常，error={} \r\n flatMessage = {}", ErrorUtils.getErrorMsg(e),flatMessage);
            }

        });

        return isHandled.get() ;

    }
}
